package com.isyscore.os.metadata.kettle.step;

import com.isyscore.os.metadata.enums.KettleDataFlowNodeType;
import com.isyscore.os.metadata.kettle.base.AbstractStep;
import com.isyscore.os.metadata.kettle.base.FlowConfig;
import com.isyscore.os.metadata.kettle.base.FlowHup;
import com.isyscore.os.metadata.kettle.base.Step;
import com.isyscore.os.metadata.kettle.vis.*;
import com.isyscore.os.metadata.utils.StringEscapeHelper;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.plugins.PluginInterface;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.pentaho.di.trans.step.errorhandling.StreamInterface;
import org.pentaho.di.trans.steps.filterrows.FilterRowsMeta;
import org.pentaho.di.trans.steps.groupby.GroupByMeta;
import org.pentaho.di.trans.steps.memgroupby.MemoryGroupBy;
import org.pentaho.di.trans.steps.memgroupby.MemoryGroupByMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class GroupByStep extends AbstractStep {

    @Override
    public StepMeta decode(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {
        GroupByNode node = (GroupByNode) step;
        PluginRegistry registry = PluginRegistry.getInstance();
        PluginInterface sp;
        if (node.isInMemory()) {
            sp = registry.findPluginWithId(StepPluginType.class, KettleDataFlowNodeType.MemoryGroupBy.name());
        } else {
            sp = registry.findPluginWithId(StepPluginType.class, KettleDataFlowNodeType.GroupBy.name());
        }

        StepMetaInterface stepMetaInterface = (StepMetaInterface) registry.loadClass(sp);



        String[] groupField = new String[node.getGroupFiled().size()];
        String[] aggregateField = new String[node.getMeasures().size()];//别名
        String[] subjectField = new String[node.getMeasures().size()];//字段名
        int[] aggregateType = new int[node.getMeasures().size()];

        for (int i = 0; i < node.getGroupFiled().size(); i++) {
            groupField[i] = node.getGroupFiled().get(i);
        }

        for (int i = 0; i < node.getMeasures().size(); i++) {
            aggregateField[i] = node.getMeasures().get(i).getMeasureName();
            subjectField[i] = node.getMeasures().get(i).getFieldName();
            aggregateType[i] = Integer.parseInt(node.getMeasures().get(i).getAggregationType().getSymbol());
        }

        if(node.isInMemory()){
            MemoryGroupByMeta  meta =  (MemoryGroupByMeta) stepMetaInterface;
            meta.setGroupField(groupField);
            meta.setAggregateField(aggregateField); //别名
            meta.setSubjectField(subjectField);
            meta.setAggregateType(aggregateType);
            meta.setValueField(new String[node.getMeasures().size()]);
        }else{
            GroupByMeta meta =  (GroupByMeta) stepMetaInterface;
            meta.setGroupField(groupField);
            meta.setAggregateField(aggregateField); //别名
            meta.setSubjectField(subjectField);
            meta.setAggregateType(aggregateType);
            meta.setValueField(new String[node.getMeasures().size()]);
        }




        StepMeta stepMeta = new StepMeta(node.isInMemory() ? KettleDataFlowNodeType.MemoryGroupBy.name() : KettleDataFlowNodeType.GroupBy.name(), node.getNodeName(), stepMetaInterface);
        stepMeta.setDraw(true);
        return stepMeta;
    }

    @Override
    public StepMeta after(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {
        GroupByNode groupByNode = (GroupByNode) step;
        List<FlowHup> transHups = transGraph.getTransHups();
        List<FlowHup> newTransHups = new ArrayList<>();
        String sortStepName = step.getNodeName() + "-默认排序";
        for (FlowHup transHup : transHups) {

            if (transHup.getTo().equals(step.getNodeName())) {
                newTransHups.add(new FlowHup(sortStepName, step.getNodeName()));
                newTransHups.add(new FlowHup(transHup.getFrom(), sortStepName));

                //处理pr节点是多路输出的情况
                dealMultiOut(transMeta, transGraph, sortStepName, transHup);

            } else {
                newTransHups.add(transHup);
            }
        }


        SortRowsStep sortRowsStep = new SortRowsStep();
        SortRowsNode visNode = new SortRowsNode();
        visNode.setNodeName(sortStepName);
        List<SortField> collect = groupByNode.getGroupFiled().stream().map(f -> new SortField(f, true)).collect(Collectors.toList());
        visNode.setSortFields(collect);

        StepMeta sortMeta = sortRowsStep.decode(visNode, databases, transMeta, transGraph);
        sortMeta.setParentTransMeta(transMeta);
        transMeta.getSteps().add(sortMeta);

        List<TransHopMeta> hopMetas = newTransHups.stream().map(h -> new TransHopMeta(getStep(transMeta, h.getFrom()), getStep(transMeta, h.getTo()))).collect(Collectors.toList());
        transMeta.setTransHops(hopMetas);

        transGraph.setTransHups(hopMetas.stream().map(h -> new FlowHup(h.getFromStep().getName(), h.getToStep().getName())).collect(Collectors.toList()));

        return null;
    }

}
